-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BigQueryIO uniformize direct and export reads #32360
base: master
Are you sure you want to change the base?
Conversation
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
5dbe389
to
989ccdd
Compare
assign set of reviewers |
Assigning reviewers. If you would like to opt out of this review, comment R: @robertwb for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Some BQ integration tests are failing. I don't know schema & data of the following |
// read table schema and infer coder if possible | ||
Coder<T> c; | ||
if (getCoder() == null) { | ||
tableSchema = requestTableSchema(sourceDef, bqOptions, getSelectedFields()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it fine to access the BQ table at graph creation time? (It was already doing that when beam schema was requested)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this is a valid concern. I've heard use case where pipeline submission machine does not or has incomplete permission to the resource, and infer schema at graph creation time can cause issue. General guideline is the use case used to work should be able to work still (and vice versa)
@@ -1731,7 +1870,7 @@ public void processElement(ProcessContext c) throws Exception { | |||
.setTable( | |||
BigQueryHelpers.toTableResourceName( | |||
queryResultTable.getTableReference())) | |||
.setDataFormat(DataFormat.AVRO)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was arrow even supported ?
...-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryReaderFactory.java
Outdated
Show resolved
Hide resolved
...ogle-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java
Outdated
Show resolved
Hide resolved
...ogle-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
Outdated
Show resolved
Hide resolved
...-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
Outdated
Show resolved
Hide resolved
@@ -89,8 +95,8 @@ static class BigQueryIOReadTranslator implements TransformPayloadTranslator<Type | |||
.addNullableBooleanField("use_legacy_sql") | |||
.addNullableBooleanField("with_template_compatibility") | |||
.addNullableByteArrayField("bigquery_services") | |||
.addNullableByteArrayField("parse_fn") | |||
.addNullableByteArrayField("datum_reader_factory") | |||
.addNullableByteArrayField("bigquery_reader_factory") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a complex object to serialize. subject to serialization error if there's changes between versions
...ogle-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
Outdated
Show resolved
Hide resolved
Reminder, please take a look at this pr: @robertwb @ahmedabu98 |
Reminder, please take a look at this pr: @kennknowles @ahmedabu98 |
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @robertwb for label java. Available commands:
|
adc31e2
to
a7a81be
Compare
I did some refactoring, reducing the breaking changes and allowing an easier transform upgrade |
It should be possible to read BQ avro data using a provided compatible avro schema for both file and direct read. Add readRows api Improve coder inference Self review Fix concurrency issue spotless checkstyle Ignore BigQueryIOTranslationTest Add missing project option to execute test Call table schema only if required Fix avro export without logical type checkstyle Add back float support FIx write test Add arrow support in translation
Reduce breaking changes by configuring IO with simple objects
a7a81be
to
c73e4e0
Compare
c73e4e0
to
1c52b2a
Compare
Reminder, please take a look at this pr: @robertwb @chamikaramj |
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @kennknowles for label java. Available commands:
|
Reminder, please take a look at this pr: @kennknowles @damondouglas |
Refers to #26329, also fix #20100, #21076
When using
readWithDatumReader
andDIRECT_READ
method, the transform would fail because theparseFn
is expected. Refactor the IO so the avrodatumReader
can be use in both cases.In some case, it is required to get the data with the desired schema. Currently, BQ io always uses the writer schema (or table schema). Create new APIs to set the reader schema.
This refactoring contains some breaking changes:withFormat
is not exposed anymore. Indeed, it is not possible to configure aTypedRead
with aDatumReaderFactory
and change the format later. Data format MUST be chosen when creating the transform.In theTypedRead.Builder
, replace theDatumReaderFactory
with theBigQueryReaderFactory
allowing to handle both avro and arrow in uniform fashion. This alters theBigQueryIOTranslation
.I need some help on that point to handle that in a better way.
Edit: reworked part of this PR to keep compatibility